Published on

Axum被kafka consumer卡死解决方案记录

Authors

Airdrop 流程复跑后 report_user_behavior 卡死问题排查记录

问题现象

airdrop-web 页面里,第一次完整走完 Airdrop 流程通常是正常的:

  • cleanup_airdrop_tables 完成
  • JWT 生成成功
  • report_user_behavior -> 200
  • aggregate_airdrop_behavior -> 200
  • claim_airdrop OK
  • 链上 Claim 成功

但是当再次清理状态、准备重新走一遍 Airdrop 流程时,点击“登录”按钮后,对应的 report_user_behavior 接口会卡住,前端表现为:

[vite] http proxy error: /api/dmuser/report_user_behavior
Error: read ECONNRESET

或者请求一直 pending,不返回结果。

一开始看上去像是:

  • report_user_behavior 业务逻辑有问题
  • 数据库状态没清干净
  • Vite 代理异常
  • JWT 失效

但最后排查下来,真正问题并不在这些表面现象上。


复现路径

问题通常出现在这条完整链路之后:

  1. cleanup_airdrop_tables
  2. 生成 JWT
  3. report_user_behavior
  4. aggregate_airdrop_behavior
  5. claim_airdrop
  6. 发起链上 Claim
  7. Kafka 收到链上 claim 事件
  8. 后端消费该事件并更新数据库
  9. 再次尝试点击“登录”
  10. report_user_behavior 卡住

关键点在于:

不是第一次 report_user_behavior 卡,而是“完整走完一轮 claim 流程之后”,下一次任何新的 HTTP 请求开始异常,恰好最先暴露在“登录”这个按钮上。


最初的误判方向

一开始很容易怀疑这些方向:

1. 前端代理配置错误

前端请求通过 Vite 代理 /api/*127.0.0.1:3003
检查后发现:

  • 前端路径拼接没问题
  • 后端路由存在
  • 第一次请求能成功返回 200

所以不是简单的 URL 或 proxy 配置错误。

2. report_user_behavior 业务逻辑有 bug

因为页面上是点“登录”后卡住,所以最自然会怀疑:

  • report_user_behavior 写表死锁
  • dm_airdrop_behavior_stat 状态冲突
  • 上一次 claim 状态没清理干净

但日志继续往下看后发现:

  • 第一次 report_user_behavior 是成功的
  • 后续卡住时,甚至连外层 http request 日志都没有出现
  • 说明请求根本没进入业务 handler

所以这也不是业务表状态直接导致的。

3. 日志过多导致阻塞

.env 中曾使用过:

RUST_LOG=trace,sea_orm=debug

这确实会放大问题,因为大量日志可能拖慢服务。

但用户确认:

  • 最早在 info 级别时就已经会复现
  • 改成 trace 只是为了排查

所以日志级别不是根因,只是可能放大症状。


关键观察

排查过程中,抓到几个特别关键的现场信号。

1. 3003 端口仍然在监听

服务卡住后,ss -ltnp 仍然显示:

  • dm_server 还在监听 0.0.0.0:3003

这说明:

  • 进程没有退出
  • HTTP server 没有整体崩溃

2. 新请求连最外层日志都进不来

正常请求进入后,日志里应该有类似:

http request method=POST uri=/api/dmuser/report_user_behavior

但卡死之后:

  • 新请求建立了 TCP 连接
  • 客户端把数据发给服务端
  • 后端却没有继续打印 http request

这说明问题已经不在业务 handler,而在更靠前的 HTTP accept / request dispatch 层。

3. 卡死出现在链上 claim 事件处理之后

日志的最后一段稳定停在 Kafka 合约事件消费之后,例如:

  • 消费到 dmt_airdrop_claimed 事件
  • dmt_airdrop_claimed_handler 执行完成
  • 数据库更新成功
  • 之后 HTTP 服务不再接收新请求

这说明触发点非常明确:

不是 report_user_behavior 本身,而是 claim 事件回流后的后台 worker


真正原因

最终定位到的问题核心是:

Kafka 合约事件 worker 与 HTTP server 共用同一个 Tokio runtime,而这个 worker 内部实际上做了同步阻塞操作。

问题代码结构

contract_event_router_worker 是通过 tokio::spawn 启动的,也就是说它跑在主 HTTP runtime 里。

而它依赖的 Kafka consumer 实现大致是这样的:

pub async fn consume_message(&self) -> Result<KafkaMessage> {
    match self.consumer.poll(Duration::from_secs(1)) {
        Some(Ok(message)) => { ... }
        Some(Err(e)) => Err(...),
        None => Err(anyhow!("No message available")),
    }
}

这里最关键的问题是:

  • consume_message() 被声明成 async
  • 但里面并没有真正的异步 IO
  • 它直接调用了 BaseConsumer::poll(Duration::from_secs(1))
  • 这是同步阻塞调用

也就是说,这个函数“表面上是 async”,实际上却会阻塞 Tokio worker 线程。

为什么会影响 HTTP server

因为原始实现中:

  • HTTP server 使用 Tokio runtime
  • Kafka 合约事件 worker 也运行在同一个 runtime
  • claim 事件回流后,worker 开始执行 Kafka poll、事件分发、数据库事务、offset commit 等逻辑
  • 其中同步阻塞 poll 混在 async 执行模型里
  • 最终把共享 runtime 拖进异常状态或调度饥饿状态

结果就是:

  • 3003 端口还活着
  • 进程也没崩
  • 但 HTTP accept loop / request dispatch 不再正常工作
  • 后续新请求全部卡住

这就是为什么表面看起来像:

  • 第二次点“登录”卡死
  • report_user_behavior 有问题

但实质上是:

  • 上一轮 claim 流程触发的 Kafka 事件 worker 把主 runtime 搞挂了
  • “登录”只是下一次最先撞上的接口

为什么日志级别不是根因

日志级别不是根因,但确实会影响问题表现。

日志的作用

同步日志输出在高并发或高频 trace 场景下,确实可能进一步拖慢 runtime,甚至放大卡顿。

因此:

  • trace,sea_orm=debug 会让问题更容易暴露
  • 但不是问题产生的根本原因

真正决定性的问题

真正决定问题出现的,是这两个条件同时成立:

  1. Kafka worker 使用了同步阻塞式 poll
  2. Kafka worker 与 HTTP server 共用同一个 runtime

只要这两个条件存在,即使日志级别是 info,仍然可能复现。


解决思路

既然问题是“后台 worker 污染了主 HTTP runtime”,那解决思路就很明确:

思路一:先隔离,再精修

先不要一上来重写整套 Kafka 消费逻辑,优先做“运行时隔离”,把影响面切断:

  • 让 Kafka 合约事件 worker 运行在独立线程
  • 在线程内再创建独立 Tokio runtime
  • 不再和 HTTP server 共享同一个 runtime

这样即使 Kafka worker 内部仍然保留阻塞式 poll,也不会再影响 HTTP accept loop。

思路二:顺手处理日志输出阻塞风险

虽然日志不是根因,但同步日志写终端确实可能加剧问题。
所以一并做优化:

  • 把 tracing 输出改成非阻塞 writer
  • 避免高频日志把主线程拖慢

实际解决方案

方案 1:将 Kafka 合约事件 worker 移到独立线程/独立 runtime

原来是:

tokio::spawn(async move {
    if let Err(err) = worker.run().await {
        error!("Contract event router worker exited with error: {}", err);
    }
});

调整为:

std::thread::spawn(move || {
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();

    runtime.block_on(async move {
        if let Err(err) = worker.run().await {
            error!("Contract event router worker exited with error: {}", err);
        }
    });
});

这样做的效果是:

  • Kafka worker 独占一个线程
  • 它的阻塞 poll 不再污染 HTTP runtime
  • claim 事件处理完后,HTTP server 仍能继续正常接新请求

方案 2:日志改为非阻塞输出

使用 tracing-appender 的 non-blocking writer,替换同步终端输出。

作用:

  • 减少高日志量时 runtime 被写日志拖慢的概率
  • 让调试日志更安全
  • 但这部分属于增强稳定性,不是本次问题的根因修复

修复后的结果

修复后进行了多次复测,完整流程如下:

  1. cleanup_airdrop_tables
  2. 生成 JWT
  3. report_user_behavior
  4. aggregate_airdrop_behavior
  5. claim_airdrop
  6. 链上 Claim
  7. 再次 cleanup
  8. 再次点击“登录”

连续多次复跑后,不再出现:

  • report_user_behavior 卡死
  • Vite 代理 ECONNRESET
  • 3003 只监听不处理请求

说明本次修复已经命中真正故障点。


为什么这个问题特别容易误导

这个问题容易误导排查方向,主要有三个原因:

1. 症状和根因不在同一个地方

症状出现在“登录按钮”上,
但根因在“上一次链上 claim 事件的后台消费”。

2. 进程和端口都活着

如果服务直接 crash,反而容易定位。
这里是:

  • 进程活着
  • 端口活着
  • 但就是不再处理请求

这类问题最像 runtime/线程模型问题,而不像普通接口 bug。

3. 第一次流程完全正常

第一次全流程 200 会让人下意识排除基础架构问题,
转而怀疑“数据库状态没清干净”或“claim 状态冲突”。
但实际上问题恰恰是第一次完整流程把后台 worker 的问题触发出来了。


最终结论

这次问题的本质不是:

  • report_user_behavior 业务逻辑错误
  • JWT 生成有问题
  • Airdrop 状态表没清理干净
  • 单纯日志级别太高

而是:

Kafka 合约事件 worker 内部使用了同步阻塞式 Kafka poll,却被放在与 HTTP server 共享的 Tokio runtime 中运行;在链上 claim 事件回流后,这个 worker 把主 runtime 拖入异常状态,导致 HTTP 服务停止接收后续请求。

修复的关键不是“改登录接口”,而是:

  • 把合约事件 worker 与 HTTP runtime 解耦
  • 让后台事件消费不再影响主服务入口
  • 辅助性地把日志改成非阻塞输出

后续建议

这次虽然已经通过“隔离 worker runtime”解决了问题,但从工程角度还有两个后续优化方向:

1. 不要把同步阻塞 Kafka poll 包装成普通 async 接口

当前这种写法非常容易误导调用方,以为它是安全的 async 操作。
更好的做法是:

  • 显式放到 blocking 线程里
  • 或改成真正适合 async 的消费模型

2. 长生命周期后台 worker 尽量不要和 HTTP server 共用关键 runtime

尤其是这类涉及:

  • Kafka poll
  • DB 事务
  • 外部网络回调
  • 链上事件处理

的 worker,更应该和 HTTP 服务隔离。


本次排查的核心经验

遇到这类“端口还在监听,但请求不再进入 handler”的问题时,优先怀疑:

  • runtime 被阻塞
  • 共享线程池被污染
  • 假 async 真阻塞
  • 后台 worker 与入口服务耦合过深

不要只盯着“最后一个表现出问题的接口”,否则很容易被表象带偏。